Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exactly-once guarantee for covering index incremental refresh #122

Conversation

dai-chen
Copy link
Collaborator

@dai-chen dai-chen commented Nov 1, 2023

Description

As proposed in issue below:

  1. Add a new option id_expression in create index statement: will use this expression as ID column
  2. If not provided by user, generate ID column based on source file path + timestamp/@timestamp column
  3. If neither works, log warning without ID column generated. In this case, if incremental refresh job restarts, there maybe duplicate data added to Flint index data in OpenSearch (depends on how much bulk request failed when crash earlier)

Documentation: https://github.com/dai-chen/opensearch-spark/blob/support-covering-index-idempotency/docs/index.md#create-index-options

Examples

 ### Case 1: Configure via id_expression option ### 

scala> spark.sql("""
CREATE INDEX clientip_and_status ON ds_tables.http_logs
(clientip, status)
WITH (
  auto_refresh = true,
  id_expression = 'uuid()'
)
""")

INFO FlintSparkCoveringIndex: Generate ID column based on expression Some(uuid())
INFO FlintSparkCoveringIndex: Building covering index by == Physical Plan ==
*(1) Project [clientip#242, status#244, uuid(Some(-169273435727191972)) AS __id__#257]
+- *(1) Scan ExistingRDD[@timestamp#241,clientip#242,request#243,status#244,size#245,year#246,month#247,day#248]

      {
        "_index": "flint_myglue_ds_tables_http_logs_clientip_and_status_index",
        "_id": "aae1a580-ee85-4b9b-b2cd-8f2b80255d9d",
        "_score": 1,
        "_source": {
          "clientip": "138.64.16.0",
          "status": 304
        }
      },


 ### Case 2: Auto detect timestamp column on table ### 

scala> spark.sql("""
CREATE INDEX clientip_and_status ON ds_tables.http_logs
(clientip, status)
WITH (
  auto_refresh = true
)
""")

INFO FlintSparkCoveringIndex: Generate ID column based on expression Some(sha1(concat(input_file_name(), @timestamp)))
INFO FlintSparkCoveringIndex: Building covering index by == Physical Plan ==
*(1) Project [clientip#58, status#60, sha1(cast(concat(input_file_name(), cast(@timestamp#57 as string)) as binary)) AS __id__#73]
+- *(1) Scan ExistingRDD[@timestamp#57,clientip#58,request#59,status#60,size#61,year#62,month#63,day#64]

      {
        "_index": "flint_myglue_ds_tables_http_logs_clientip_and_status_index",
        "_id": "9df0b4e2c2859ed178070148c44d546206e29ecd",
        "_score": 1,
        "_source": {
          "clientip": "33.33.9.0",
          "status": 304
        }
      },

 ### Case 3: No ID column otherwise ### 

scala> spark.sql("""
CREATE INDEX discount_and_quantity ON stream.lineitem_tiny
(l_discount, l_quantity)
WITH (
  auto_refresh = true
)
""")

WARN FlintSparkCoveringIndex: Cannot generate ID column which may cause duplicate data when restart
INFO FlintSparkCoveringIndex: Building covering index by == Physical Plan ==
*(1) Project [l_discount#395, l_quantity#393]
+- *(1) Scan ExistingRDD[l_orderkey#389L,l_partkey#390L,l_suppkey#391L,l_linenumber#392,l_quantity#393,l_extendedprice#394,l_discount#395,l_tax#396,l_returnflag#397,l_linestatus#398,l_commitdate#399,l_receiptdate#400,l_shipinstruct#401,l_shipmode#402,l_comment#403,l_shipdate#404]

      {
        "_index": "flint_myglue_stream_lineitem_tiny_discount_and_quantity_index",
        "_id": "u3_RkIsBZG4KSy0OyASy",
        "_score": 1,
        "_source": {
          "l_discount": 0.06,
          "l_quantity": 28
        }
      },

 Manual Refresh (Same Logic) 

scala> spark.sql("""
CREATE INDEX clientip_and_status ON ds_tables.http_logs
(clientip, status)
WITH (
  auto_refresh = false
)
""")

scala> spark.sql("REFRESH INDEX clientip_and_status ON ds_tables.http_logs")

      {
        "_index": "flint_myglue_ds_tables_http_logs_clientip_and_status_index",
        "_id": "ab3d192b77d05cd77d0f8b3436a3bf2932f760e8",
        "_score": 1,
        "_source": {
          "clientip": "143.29.0.0",
          "status": 304
        }
      },

Issues Resolved

#88

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@dai-chen dai-chen added the enhancement New feature or request label Nov 1, 2023
@dai-chen dai-chen self-assigned this Nov 1, 2023
@dai-chen
Copy link
Collaborator Author

dai-chen commented Nov 9, 2023

Raising new PR #143 with auto detect timestamp/@timestamp logic removed

@dai-chen dai-chen closed this Nov 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant